package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafka;
import com.atguigu.gmall.realtime.utils.MyKafkaPro;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

//用户跳出明细统计
public class UserJumpDetailApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //设置checkpoint的的配置
        //设置没5s保存一次checkpoint,精准一次消费
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //checkpoint必须在一分钟内完成，否则舍弃,检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(6000);
        //checkpoint保存的位置，及其操作用户
        env.setStateBackend(new FsStateBackend("hdfs://hadoop104:8020/gmall/flink/checkpoint"));
        System.setProperty("HADOOP_USER_NAME","atguigu");
        //重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));

        //TODO 2 从kafka中处理数据
        String topic="dwd_topic_page";
        String groupId="UserJumpDetailApp";
        FlinkKafkaConsumer<String> kafkaSource = MyKafka.getFlinkKafkaConsumer(topic, groupId);
        DataStreamSource<String> kafkaDStream = env.addSource(kafkaSource);

        //计算跳出用户统计需要 页面的没有last_pageId为空，并且没有去访问下个页面，或者超时.


        //测试数据
       /* DataStream<String> dataStream = env
                .fromElements(
                        "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                                "\"home\"},\"ts\":15000} ",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                                "\"detail\"},\"ts\":30000} "
                );*/


        //2.1 对流中的数据进行格式转换，转换为JSONObjetc类型
        SingleOutputStreamOperator<JSONObject> JsonObjDStream = kafkaDStream.map(r -> JSONObject.parseObject(r));
        //2.2 指定水位线watermark
        SingleOutputStreamOperator<JSONObject> jsonObjWithWaterMarkDS = JsonObjDStream
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                                    @Override
                                    public long extractTimestamp(JSONObject jsonObject, long recordTimestamp) {
                                        return jsonObject.getLong("ts");
                                    }
                                })
                );
        //2.3对数据进行keyby处理.相同的mid进行处理
        KeyedStream<JSONObject, String> jsonObjectStringKeyedStream = jsonObjWithWaterMarkDS.keyBy(r -> r.getJSONObject("common").getString("mid"));
        //3 对keyby之后的数据用flinkCEP提取用户跳出明细数据


        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
                new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) {
                        //第一个条件，lastpageId 应该为空
                        String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                        if(lastPageId != null && lastPageId.length()>0){
                            return false;
                        }
                        return true;
                    }
                }
        ).next("second").where(
                new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) {
                        //第二个条件，若是超时。或者没有下个页面,在这里值判断下个页面
                        //获取第二个页面id
                        String pageId = jsonObject.getJSONObject("page").getString("page_id");
                        if(pageId !=null && pageId.length()>0){
                            //此时里面也包含了有下个页面和没有下个页面
                            return true;
                        }
                        return false;
                    }
                }
        ).within(Time.seconds(10));

        PatternStream<JSONObject> patternDStream = CEP.pattern(jsonObjectStringKeyedStream, pattern);

        //因为需要向kafka中写入数据，所以可以直接设定输出类型为String,我们需要的是10s后的数据，所以为超时数据

        OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<String> flatResult = patternDStream.flatSelect(
                outputTag,
                //处理超时数据
                new PatternFlatTimeoutFunction<JSONObject, String>() {
                    public void timeout(Map<String, List<JSONObject>> pattern,
                            long timeoutTimestamp,
                            Collector<String> out) throws Exception {
                        List<JSONObject> jsonObjectList = pattern.get("first");
                        for(JSONObject jsonObject:jsonObjectList){

                            out.collect(jsonObject.toJSONString());
                        }
                    }
                },
                //处理非超时数据
                new PatternFlatSelectFunction<JSONObject, String>() {
                    public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<String> out) throws Exception {
                     //什么操作也不需要做
                    }
                }
        );

        //通过侧输出流，活动超时数据
        DataStream<String> timeoutDS = flatResult.getSideOutput(outputTag);

        timeoutDS.print(">>>>>>>");

        //将超时数据写入到kafka,dwm_user_jump_details
        timeoutDS.addSink(MyKafkaPro.getFlinkKafkaProducer("dwm_user_jump_details"));



/*        {
            "common": {
            "ar": "110000",
                    "uid": "14",
                    "os": "Android 11.0",
                    "ch": "web",
                    "is_new": "0",
                    "md": "Xiaomi Mix2 ",
                    "mid": "mid_12",
                    "vc": "v2.1.132",
                    "ba": "Xiaomi"
        },
            "page": {
            "page_id": "home",
                    "during_time": 4844
        },
            "displays": [
            {
                "display_type": "activity",
                    "item": "2",
                    "item_type": "activity_id",
                    "pos_id": 5,
                    "order": 1
            },
            {
                "display_type": "activity",
                    "item": "1",
                    "item_type": "activity_id",
                    "pos_id": 5,
                    "order": 2
            },
            {
                "display_type": "query",
                    "item": "7",
                    "item_type": "sku_id",
                    "pos_id": 1,
                    "order": 3
            },
            {
                "display_type": "query",
                    "item": "4",
                    "item_type": "sku_id",
                    "pos_id": 3,
                    "order": 4
            },
            {
                "display_type": "promotion",
                    "item": "6",
                    "item_type": "sku_id",
                    "pos_id": 1,
                    "order": 5
            },
            {
                "display_type": "recommend",
                    "item": "7",
                    "item_type": "sku_id",
                    "pos_id": 5,
                    "order": 6
            }
],
            "ts": 1618070110000
        }*/


        env.execute();
    }

}
